Skip to content

[GLUTEN-11588][FLINK] Support rocksdb state for window operator#11589

Merged
PHILO-HE merged 1 commit intoapache:mainfrom
KevinyhZou:support_rocksdb_state
Mar 6, 2026
Merged

[GLUTEN-11588][FLINK] Support rocksdb state for window operator#11589
PHILO-HE merged 1 commit intoapache:mainfrom
KevinyhZou:support_rocksdb_state

Conversation

@KevinyhZou
Copy link
Contributor

@KevinyhZou KevinyhZou commented Feb 9, 2026

What changes are proposed in this pull request?

support rocksdb state for window operator

this pr relies on: bigo-sg/velox4j#21 bigo-sg/velox#21

How was this patch tested?

manual tests

Was this patch authored or co-authored using generative AI tooling?

Related issue: #11588

@KevinyhZou KevinyhZou marked this pull request as draft February 9, 2026 08:25

**Get & compile RocksDB**
```bash
wget https://github.com/ververica/frocksdb/archive/refs/heads/FRocksDB-6.20.3.zip
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the unzip command

@KevinyhZou KevinyhZou changed the title [FLINK][GLUTEN-11588]Support rocksdb state for window operator [GLUTEN-11588][FLINK]Support rocksdb state for window operator Feb 10, 2026
@KevinyhZou KevinyhZou marked this pull request as ready for review February 10, 2026 07:26
@KevinyhZou KevinyhZou force-pushed the support_rocksdb_state branch from 5762e8c to 4a753a2 Compare February 10, 2026 07:31
inClass,
outClass,
sourceOperator.getDescription());
if (sourceOperator instanceof WindowAggOperator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use if-else

if (sourceOperator instanceof WindowAggOperator) {
    // create new windowAggOperator
    offloadedOpConfig.setStreamOperator(newOneInputOp);
} else {
    // the default handle
    offloadedOpConfig.setStreamOperator(newOneInputOp);
}

inClass,
outClass,
sourceOperator.getDescription());
if (sourceOperator instanceof WindowAggOperator) {
Copy link
Contributor

@lgbo-ustc lgbo-ustc Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the code simpler

  1. Add a new method in GlutenOneInputOperator
  public <NIN, NOUT> GlutenOneInputOperator<NIN, NOUT> cloneWithInputOutputClasses(
      Class<NIN> newInClass, Class<NOUT> newOutClass) {
    return new GlutenOneInputOperator<>(
        this.glutenPlan,
        this.id,
        this.inputType,
        this.outputTypes,
        newInClass,
        newOutClass,
        this.description);
  }
  1. Override cloneWithInputOutputClasses in WindowAggOperator
  @Override
  public <NIN, NOUT> WindowAggOperator<NIN, NOUT, W> cloneWithInputOutputClasses(
      Class<NIN> newInClass, Class<NOUT> newOutClass) {
    return new WindowAggOperator<>(
        getPlanNode(),
        getId(),
        getInputType(),
        getOutputTypes(),
        newInClass,
        newOutClass,
        getDescription(),
        keyType,
        accNames,
        accTypes);
  }
  1. Then only need to make following in OffloadedJobGraphGenerator
    GlutenOneInputOperator<?, ?> newOneInputOp = sourceOperator.cloneWithInputOutputClasses(inClass, outClass);

@KevinyhZou
Copy link
Contributor Author

@PHILO-HE can you help to review this pr? thanks.

@PHILO-HE PHILO-HE changed the title [GLUTEN-11588][FLINK]Support rocksdb state for window operator [GLUTEN-11588][FLINK] Support rocksdb state for window operator Mar 5, 2026
Copy link
Member

@PHILO-HE PHILO-HE left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments. Thanks.

sudo .github/workflows/util/install-flink-resources.sh
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j && git reset --hard 288d181a1b05c47f1f17339eb498dd6375f7aec8
git clone -b fix_state_init https://github.com/KevinyhZou/velox4j.git
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend to use organization repo. And please also make it consistent with the commit ID provided in Flink.md

}

function install_rocksdb {
wget_and_untar https://github.com/ververica/frocksdb/archive/refs/heads/FRocksDB-${FROCKSDB_VERSION}.zip "" zip
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For downloading github repo code, maybe, better to add and use github_checkout function. Then, keep wget_and_untar unchanged.

https://github.com/facebookincubator/velox/blob/7bc1be16af78637be17ee4950a68b43250c3d44d/scripts/setup-helper-functions.sh#L53

@KevinyhZou KevinyhZou force-pushed the support_rocksdb_state branch from 9843cfb to 3222fa8 Compare March 6, 2026 04:13
@KevinyhZou KevinyhZou force-pushed the support_rocksdb_state branch from ac188da to 889b14f Compare March 6, 2026 06:08
@KevinyhZou
Copy link
Contributor Author

Fixed.

Copy link
Member

@PHILO-HE PHILO-HE left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@PHILO-HE PHILO-HE merged commit 05debde into apache:main Mar 6, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants